#!/usr/node/bin/node --abort_on_uncaught_exception
/*
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 */

/*
 * Copyright (c) 2019, Joyent, Inc.
 */

/*
 * The Ur agent is an AMQP client that connects to the rabbitmq zone(s) running
 * in the DC and waits for commands. It is used for bootstrapping and initial
 * setup of the stack and is also used for running automated commands on some or
 * all CNs in a give DC.
 *
 * At startup Ur sends the output from /usr/bin/sysinfo in order to identify
 * itself to CNAPI.
 *
 */

var amqp = require('./amqp-plus');
var bunyan = require('bunyan');
var execFile = require('child_process').execFile;
var fs = require('fs');
var netconfig = require('triton-netconfig');
var path = require('path');
var util = require('util');

/* Global Variables (shared by all Ur connection instances) */

var debug = process.env['UR_DEBUG'] ? true : false;
var heartbeatSeconds = 30;
var maxReconnectAttempts = 3;
var MAX_BUFFER = 5 * 1024 * 1024;

function UrAgent(options) {
    var self = this;

    self.options = options || {};
    self.reconnectAttempts = 0;
    self.setupStateFile = '/var/lib/setup.json';
    self.urStartupFilePath = '/tmp/.ur-startup';
    self.mockCNServerUUID = null;

    // init gets run on creation of each new instance.
    (function init() {
        // Load data into self from env + passed options
        applyOptions();

        /*
         * Connect NOW!
         */
        connect();
    })();

    function applyOptions() {

        self.creds = {
            heartbeat: heartbeatSeconds,
            host: process.env['AMQP_HOST'] || 'localhost',
            port: process.env['AMQP_PORT'] || 5672,
            login: process.env['AMQP_LOGIN'] || 'guest',
            password: process.env['AMQP_PASSWORD'] || 'guest',
            vhost: process.env['AMQP_VHOST'] || '/'
        };

        if (self.options.hasOwnProperty('sysinfoFile')) {
            self.sysinfoFile = self.options.sysinfoFile;
        }
        if (self.options.hasOwnProperty('setupStateFile')) {
            self.setupStateFile = self.options.setupStateFile;
        }
        if (self.options.hasOwnProperty('urStartupFilePath')) {
            self.urStartupFilePath = self.options.urStartupFilePath;
        }
        if (self.options.hasOwnProperty('mockCNServerUUID')) {
            self.mockCNServerUUID = self.options.mockCNServerUUID;
        }

        self.log = self.options.log
            || bunyan.createLogger({name: 'ur', level: 'debug'});
    }

    function setupPingQueue() {
        var pq;
        var resource = 'ur';
        var queueName;

        queueName = resource + '.ping.' + self.nodeUUID;
        pq = self.connection.queue(queueName);

        pq.addListener('open', function (messageCount, consumerCount) {
            pq.subscribe({ ack: true }, function (msg, headers, deliveryInfo) {
                self.log.info('Received ping message');
                var client_id = msg.client_id;
                var id = msg.id;

                var reply = {
                    req_id: id,
                    timestamp: new Date().toISOString()
                };
                var routingKey = resource + '.ack' + client_id + '.'
                    + self.nodeUUID;

                self.log.info('Publishing ping reply to ' + routingKey);
                self.exchange.publish(routingKey, reply);

                pq.shift();
            });

            pq.bind('amq.topic', resource + '.ping.' + self.nodeUUID);
        });
    }

    function readSetupStateFile(callback) {
        fs.readFile(self.setupStateFile, function (error, data) {
            var setupState;

            if (error) {
                self.log.error('Error reading file %s: %s', self.setupStateFile,
                    error.message);
                callback(error);
                return;
            }

            try {
                setupState = JSON.parse(data.toString());
            }
            catch (e) {
                self.log.error('could not parse %s: %s', self.setupStateFile,
                    error.message);
                callback(error);
                return;
            }

            callback(null, setupState);
            return;
        });
    }

    /*
     * This function starts a poller which watches for changes to the
     * setupStateFile file. The point of this is to ensure that we notice
     * when an unsetup (eg. freshly booted) CN gets setup as CNAPI will
     * be watching for that transition.
     */
    function setupStateChecker() {
        var pollInterval = 60 * 1000; // frequency to poll in ms

        clearInterval(self.broadcastSetupStateInterval);
        self.broadcastSetupStateInterval = null;

        // Start the poller
        check();

        function publish() {
            loadSysinfo(function (err, exitStatus, stdout, stderr) {
                if (err) {
                    self.log.error('sysinfo error: ' + stderr.toString());
                    dump(arguments);
                    return;
                }
                self.sysinfo = JSON.parse(stdout);
                self.exchange.publish(pjoin('ur.sysinfo', self.nodeUUID,
                    genId()), self.sysinfo);
            });
        }

        function check() {

            // We first check if the file exists, if it doesn't we'll keep
            // polling until it does.
            fs.exists(self.setupStateFile, function (exists) {
                function _startSetupCheckInterval() {
                    if (!self.broadcastSetupStateInterval) {
                        self.log.debug('Server isn\'t set up, initiating '
                            + 'polling');
                        self.broadcastSetupStateInterval = setInterval(
                            function _setupCheck() {
                                // Do another check, it's been pollInterval secs
                                check();
                            },
                        pollInterval);
                    }
                }

                if (!exists) {
                    // setupStateFile does not exist, so we'll start a poller so
                    // that when it gets created we'll start reading the setup
                    // state.
                    _startSetupCheckInterval();
                    publish();
                    return;
                }

                readSetupStateFile(function (error, setupState) {
                    if (setupState.complete === true) {
                        self.log.debug('Server is setup!');
                        clearInterval(self.broadcastSetupStateInterval);
                        self.broadcastSetupStateInterval = null;
                        publish();
                        return;
                    }
                    publish();
                });
            });
        }
    }

    function onReady() {
        // Grab the sysinfo and publish a startup message indicating we've come
        // online and are ready to receive requests.
        loadSysinfo(function (error, exitStatus, stdout, stderr) {
            var adminNic;

            if (error) {
                self.log.error('sysinfo error: ' + stderr.toString());
                dump(arguments);
                return;
            }

            // output of sysinfo is a JSON object
            self.sysinfo = JSON.parse(stdout);

            // Use the UUID param to uniquely identify this machine on AMQP.
            self.nodeUUID = self.sysinfo['UUID'];
            if (!self.nodeUUID) {
                throw new Error('Could not find "UUID" in `sysinfo` output.');
            }

            // Open the exchange where we will publish outbound messages.
            self.exchange = self.connection.exchange('amq.topic',
                { type: 'topic' });
            setupStateChecker();

            adminNic = netconfig.adminNicFromSysinfo(self.sysinfo);
            if (!adminNic) {
                throw new Error('No admin NIC detected.');
            }

            self.adminMac = adminNic['MAC Address'];

            bindQueue();
        });
    }

    function connect()
    {
        self.connection = new amqp.Connection(self.creds, { log: self.log });
        self.connection.once('ready', onReady);
        self.connection.reconnect();
    }

    function bindQueue() {
        self.queue = self.connection.queue('ur.request.' + self.nodeUUID + '.'
            + self.adminMac, { exclusive: true });

        self.queue.on('error', function (e) {
            var AMQP_RESOURCE_LOCKED = 405;
            var AMQP_ACCESS_REFUSED = 403;

            // If error is not one of these, retry a few times.
            if (e.code !== AMQP_RESOURCE_LOCKED
                && e.code !== AMQP_ACCESS_REFUSED)
            {
                self.log.error('AMQP Queue Error received (retrying): '
                    + e.message);
                return;
            }

            if (++self.reconnectAttempts > maxReconnectAttempts) {
                self.log.error('Could not acquire exclusive ur queue after '
                    + self.reconnectAttempts + ' attempts.');

                // XXX we shouldn't exit!
                process.exit(1);
            }

            self.log.error('Error: %s (Attempt: %s)',
                e.message,
                [self.reconnectAttempts, maxReconnectAttempts].join('/'));

            self.exchange.publish(pjoin('ur.execute', self.nodeUUID, genId()),
                {});

            setTimeout(function () {
                self.connection.end();
            }, 5000);
        });

        self.queue.on('open', function () {
            self.reconnectAttempts = 0;
            setupPingQueue();
            self.queue.bind('amq.topic', 'ur.execute.' + self.nodeUUID + '-'
                + self.adminMac + '.*');
            self.queue.bind('amq.topic', 'ur.execute.' + self.nodeUUID + '.*');
            self.queue.bind('amq.topic', 'ur.broadcast.*.*');
            self.queue.subscribe({ ack: true }, onExecute);

            fs.exists(self.urStartupFilePath, function (exists) {
                if (!exists) {
                    fs.writeFile(self.urStartupFilePath, '', function (error) {
                        if (error) {
                            throw error;
                        }

                        // Open the exchange for publishing outbound messages.
                        var routingKey = 'ur.startup.' + self.nodeUUID;
                        self.log.info('Sending startup message to routing key '
                            + routingKey);
                        dump(self.sysinfo);
                        self.exchange.publish(routingKey, self.sysinfo);
                    });
                }
            });
        });
    }

    function onExecute(m, headers, deliveryInfo) {
        // Save the request id so we can reference it in the reply's routing
        // key.
        var key = deliveryInfo.routingKey.split('.', 4);
        var reqID = key[3];

        dump(m);
        self.queue.shift();

        self.log.info({ req_id: reqID }, 'Received a message to routing key: '
            + deliveryInfo.routingKey);

        if (key[1] == 'broadcast') {
            switch (key[2]) {
                case 'sysinfo':
                    loadSysinfo(function (error, exitStatus, stdout, stderr) {
                        if (error) {
                            self.log.error({ req_id: reqID}, 'broadcast sysinfo'
                                + ' error: ' + stderr.toString());
                            return;
                        }
                        publishBroadcastReply(reqID, JSON.parse(stdout));
                    });
                    break;
                default:
                    self.log.info({ req_id: reqID}, 'unknown broadcast key '
                        + '"%s"', key[2]);
                    break;
            }

            return;
        }

        switch (m.type) {
            case 'file':
                executeFile(
                    m.file,
                    m.env,
                    m.args,
                    replyFn);

                break;

            case 'script':
                executeScript(
                    m.script,
                    m.env,
                    m.args,
                    replyFn);

                break;

            default:
                self.log.warn('Unknown message type: %s', m.type);
                break;
        }

        function replyFn(error, stdout, stderr) {
            // An exit status of 113 from the executable indicates that we want
            // the machine to reboot after we reply.
            var exitStatus = error.code;
            var publishExitStatus = error.code === 113
                || error.code === undefined ? 0 : error.code;

            publishExecuteReply(
                reqID,
                { code: publishExitStatus },
                stdout,
                stderr,
                function () {
                    // Reboot if the exit status code is 113. 113 or 1/13 is the
                    // 30th prime number and also an "Einstein Prime".
                    if (exitStatus !== 113) {
                        return;
                    }

                    // Allow some time for node gnomes to carry our reply AMQP
                    // packets away before we reboot the machine.
                    setTimeout(function () {
                        self.connection.end();
                        if (debug) {
                            fakeReboot();
                        } else {
                            reboot();
                        }
                    }, 1000);
                }
            );
        }
    }

    function reboot() {
        execFile(
            '/usr/sbin/shutdown',
            ['-y', '-g', '0', '-i', '6'],
            function (error, stdout, stderr) {
                if (error) {
                    throw new Error(stderr.toString());
                }
                setTimeout(function () {
                    forceReboot();
                }, 5*60*1000);
            });
    }

    function forceReboot() {
        execFile(
            '/usr/sbin/reboot',
            [],
            function () {
                // Wait for reboot
            });
    }

    function fakeReboot() {
        self.log.info('I am so rebooting right now.');
        process.exit(0);
    }

    // Execute a file and then call callback with captured output and exit
    // status code.
    function executeFile(filename, env, args, callback) {
        var new_env;

        self.log.info('Executing file: ' + filename);

        new_env = env;
        if (!new_env) {
            new_env = process.env;
        }

        if (self.mockCNServerUUID) {
            new_env['MOCKCN_SERVER_UUID'] = self.mockCNServerUUID;
        }

        execFile(
            filename,
            args || [],
            { env: new_env, maxBuffer: MAX_BUFFER },
            function (error, stdout, stderr) {
                if (error) {
                    callback(error, stdout.toString(), stderr.toString());
                    return;
                }
                stdout = stdout.toString();
                stderr = stderr.toString();

                self.log.info('Exit status for %s was.',
                    filename, error ? error.code : 0);
                self.log.info('STDOUT:');
                self.log.info(stdout);
                self.log.info('STDERR:');
                self.log.info(stderr);

                callback(0, stdout, stderr);
            });
    }

    // Execute a script string.
    function executeScript(script, env, args, callback) {
        var filename = tmpFilename();

        self.log.info('Executing script: ' + script);
        self.log.info('Writing file ' + filename);

        // Write file, 'chmod 700' it, run it, delete file.
        fs.writeFile(filename, script, function () {
            fs.chmod(filename, parseInt('0700', 8), function () {
                executeFile(filename, env, args,
                    function (error, stdout, stderr) {

                    var execArgs = Array.prototype.slice.apply(arguments);

                    self.log.info('Execute complete (%s)', filename);
                    fs.unlink(filename, function (ulerror) {
                        if (ulerror) {
                            self.log.info('Error unlinking file (%s)',
                                filename);
                        }

                        callback.apply(undefined, execArgs);
                    });
                });
            });
        });
    }

    // Publish an 'ur.execute-reply'-type message to AMQP.
    function publishExecuteReply(reqID, error, stdout, stderr, callback) {
        var exitStatus = 0;
        var reply;
        var routingKey = 'ur.execute-reply.' + self.nodeUUID + '.' + reqID;

        self.log.info({ err: error }, 'publishExecuteReply return');

        if (error) {
            if (error.code === 0 || error.code) {
                exitStatus = error.code;
            } else {
                exitStatus = 1;
                stderr = error.message;
            }
        }

        reply = {
            exit_status: exitStatus,
            stdout:      stdout,
            stderr:      stderr
        };

        self.log.info('Publishing execute-reply to request ' + reqID + ':');
        dump(reply);
        self.exchange.publish(routingKey, reply);
        callback();
    }

    function publishBroadcastReply(reqID, reply) {
        var routingKey = 'ur.execute-reply.' + self.nodeUUID + '.' + reqID;

        self.log.info('Publishing execute-reply to broadcast request ' + reqID
            + ':');
        dump(reply);

        self.exchange.publish(
            routingKey,
            reply
        );
    }

    // Run the sysinfo script and return the captured stdout, stderr, and exit
    // status code.
    function loadSysinfo(callback) {
        var paramsExecutable = debug ? './sysinfo-fake.sh' : '/usr/bin/sysinfo';

        if (self.sysinfoFile) {
            fs.readFile(self.sysinfoFile, function (error, data) {
                if (error) {
                    callback(error, 1, '', error.message);
                    return;
                }
                callback(null, 0, data, '');
            });
        } else {
            execFile(
                paramsExecutable,
                [],
                function (exitStatus, stdout, stderr) {
                    if (exitStatus) {
                        callback(new Error(stderr), exitStatus, stdout, stderr);
                        return;
                    }

                    callback(
                        undefined,
                        exitStatus,
                        stdout.toString().trim(),
                        stderr.toString().trim());
                }
            );
        }
    }

    // Quick and dirty generation of tmp filenames.
    function tmpFilename() {
        return '/tmp/ur-' + genId();
    }

    // Generate a hex representation of a random four byte string.
    function genId() {
        return Math.floor(Math.random() * 0xffffffff).toString(16);
    }

    function dump(obj, depth) {
        self.log.info(util.inspect(obj, true, depth ? depth : 20));
    }

    function pjoin() {
        return Array.prototype.join.call(arguments, '.');
    }
}

/*
 * UrAgent.shutdown() is used to stop a running UrAgent instance. It cleans up
 * the queue and exchange and closes the connection. At that point the instance
 * is ready to be destroyed.
 *
 */
UrAgent.prototype.shutdown = function () {
    var self = this;

    if (self.broadcastSetupStateInterval) {
        clearInterval(self.broadcastSetupStateInterval);
        self.broadcastSetupStateInterval = null;
    }

    if (self.queue) {
        self.queue.destroy();
    }

    if (self.exchange) {
        self.exchange.destroy();
    }

    if (self.connection) {
        self.connection.end();
    }

    return;
};

/*
 * Run a single global UrAgent() if we're not imported elsewhere (ie. this file
 * is being run directly.
 */
var main = function () {
    // this creates a new UrAgent and starts it.
    var ur = new UrAgent();
    return ur;
};

if (require.main === module) {
    main();
}

module.exports = {
    UrAgent: UrAgent
};
